package com.bigdata.wsr.filter;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RichFilterFunction;

/**
 * 表数据过滤
 *
 * @author rui.wang
 * @date 2022/11/08
 */
@Slf4j
public class TableDataFilter extends RichFilterFunction<String> {
    private String tableName;

    public TableDataFilter(String tableName) {
        this.tableName = tableName;
    }

    @Override
    public boolean filter(String value) throws Exception {
        String dbTable = JSONObject.parseObject(value).getString("dbTable");
//        log.info("dbTable -----------> {}", dbTable);
        return tableName.equals(dbTable.split("\\.")[1]);
    }
}
